/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * license agreements; and to You under the Apache License, version 2.0:
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * This file is part of the Apache Pekko project, which was derived from Akka.
 */

/*
 * Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
 */

package org.apache.pekko.remote.artery

import java.net.InetAddress

import scala.concurrent.Await
import scala.concurrent.duration.Duration

import com.typesafe.config.ConfigFactory
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import org.apache.pekko
import pekko.actor.{ ActorSystem, Address }
import pekko.remote.classic.transport.netty.NettyTransportSpec._
import pekko.testkit.SocketUtil

trait BindCanonicalAddressBehaviors {
  this: AnyWordSpec with Matchers =>
  def arteryConnectionTest(transport: String, isUDP: Boolean): Unit = {

    val commonConfig = BindCanonicalAddressSpec.commonConfig(transport)

    "bind to a random port" in {
      val config = ConfigFactory.parseString(s"""
        pekko.remote.artery.canonical.port = 0
      """)

      implicit val sys = ActorSystem("sys", config.withFallback(commonConfig))

      getInternal() should contain(getExternal())
      Await.result(sys.terminate(), Duration.Inf)
    }

    "bind to a random port but remoting accepts from a specified port" in {
      val address = SocketUtil.temporaryServerAddress(InetAddress.getLocalHost.getHostAddress, udp = isUDP)

      val config = ConfigFactory.parseString(s"""
        pekko.remote.artery.canonical.port = ${address.getPort}
        pekko.remote.artery.bind.port = 0
      """)

      implicit val sys = ActorSystem("sys", config.withFallback(commonConfig))

      getExternal() should ===(address.toAkkaAddress("pekko"))
      // May have selected the same random port - bind another in that case while the other still has the canonical port
      val internals =
        if (getInternal().collect { case Address(_, _, _, Some(port)) => port }.toSeq.contains(address.getPort)) {
          val sys2 = ActorSystem("sys", config.withFallback(commonConfig))
          val secondInternals = getInternal()(sys2)
          Await.result(sys2.terminate(), Duration.Inf)
          secondInternals
        } else {
          getInternal()
        }
      internals should not contain address.toAkkaAddress("pekko")
      Await.result(sys.terminate(), Duration.Inf)
    }

    "bind to a specified bind hostname and remoting aspects from canonical hostname" in {
      val config = ConfigFactory.parseString(s"""
        pekko.remote.artery.canonical.port = 0
        pekko.remote.artery.canonical.hostname = "127.0.0.1"
        pekko.remote.artery.bind.hostname = "localhost"
      """)

      implicit val sys = ActorSystem("sys", config.withFallback(commonConfig))

      getInternal().flatMap(_.host) should contain("localhost")
      getExternal().host should contain("127.0.0.1")
    }

    "bind to a specified port and remoting accepts from a bound port" in {
      val address = SocketUtil.temporaryServerAddress(InetAddress.getLocalHost.getHostAddress, udp = isUDP)

      val config = ConfigFactory.parseString(s"""
        pekko.remote.artery.canonical.port = 0
        pekko.remote.artery.bind.port = ${address.getPort}
      """)

      implicit val sys = ActorSystem("sys", config.withFallback(commonConfig))

      getExternal() should ===(address.toAkkaAddress("pekko"))
      getInternal() should contain(address.toAkkaAddress("pekko"))
    }

    "bind to all interfaces" in {
      val config = ConfigFactory.parseString(s"""
        pekko.remote.artery.bind.hostname = "0.0.0.0"
      """)

      implicit val sys = ActorSystem("sys", config.withFallback(commonConfig))

      getInternal().flatMap(_.port) should contain(getExternal().port.get)
      getInternal().map(x => (x.host.get should include).regex("0.0.0.0".r)) // regexp dot is intentional to match IPv4 and 6 addresses

      Await.result(sys.terminate(), Duration.Inf)
    }
  }
}

class BindCanonicalAddressSpec extends AnyWordSpec with Matchers with BindCanonicalAddressBehaviors {
  s"artery with aeron-udp transport" should {
    behave.like(arteryConnectionTest("aeron-udp", isUDP = true))
  }
  s"artery with tcp transport" should {
    behave.like(arteryConnectionTest("tcp", isUDP = false))
  }
  s"artery with tls-tcp transport" should {
    behave.like(arteryConnectionTest("tls-tcp", isUDP = false))
  }
}

object BindCanonicalAddressSpec {
  def commonConfig(transport: String) = ConfigFactory.parseString(s"""
    pekko {
      actor.provider = remote
      remote.artery.enabled = true
      remote.artery.transport = "$transport"
    }
  """)
}
